package biz

import (
	"context"
	"fmt"
	segmentioKafka "github.com/segmentio/kafka-go"
	v1 "kratos_kafka/api/helloworld/v1"
)

func (uc *GreeterUsecase) SendMsgToKafkaBySegHashPartition(ctx context.Context, req *v1.Empty) (*v1.Empty, error) {

	userIDs := []string{"user1", "user1", "user1", "user1", "user2", "user3", "user4"}

	for idx, userId := range userIDs {
		fmt.Println("userId: ", userId)

		// 发送消息
		err := uc.broker.SegKafkaMyTopic.WriteMessages(
			ctx,
			segmentioKafka.Message{
				// Notice 上面在初始化 writer 的时候指定了 Balancer, 是用msg的Key做判断的～
				Key:   []byte(userId),
				Value: []byte(fmt.Sprintf("Hello-%v-%v", idx, userId)),
			})
		if err != nil {
			fmt.Println("Failed to send message:", err)
		} else {
			fmt.Println("Message sent successfully")
		}
	}

	return &v1.Empty{}, nil
}
